Add opt-in query pipelining#3652
Conversation
Allow multiple queries to be sent on the wire before waiting for responses, reducing round-trip latency. Enabled via client.pipelining = true. Each query gets its own Sync boundary so errors are isolated. Tracks in-flight named statements (submittedNamedStatements) to prevent duplicate Parse messages when pipelining queries with the same prepared statement name. Handles error/disconnect cleanup for the sent queue.
- Clean up submittedNamedStatements on error in _handleErrorMessage to prevent stale entries from blocking future re-preparation of the same named statement after a parse failure - Guard _pulsePipelinedQueryQueue against non-queryable connections - Fix cancel() and readTimeout for sent queries: removing an already-sent query from _sentQueryQueue corrupts the pipeline response mapping since the server will still respond to it; no-op the callback instead - Add bench-pipelining.js comparing serial vs pipelined throughput
Gate _sentQueryQueue activation on readyForQuery=true inside _pulsePipelinedQueryQueue (and remove the redundant promotion block from _handleReadyForQuery) to eliminate the microtask/macrotask race where the next query could be activated as _activeQuery before the error's ReadyForQuery arrived, causing that RFQ to be handled by the wrong query. Also adds the error-listener fix for the query_timeout integration test so the expected stream-destroy doesn't leak as an unhandled 'error'.
- New features/pipelining.mdx documenting the opt-in flag - Client and Pool API reference updated - Pool accepts `pipelining: true` and sets it on every client it creates
This comment was marked as outdated.
This comment was marked as outdated.
|
@nigrosimone do you know why they are stuck? cc @brianc how do you want to proceed? |
This comment was marked as resolved.
This comment was marked as resolved.
- pg-native: handle PGRES_PIPELINE_SYNC/PGRES_PIPELINE_ABORTED in _emitResult; add pipeline() batch method using libpq 14+ pipeline mode (enterPipelineMode, pipelineSync, exitPipelineMode) - pg-native: bump libpq dependency to ^1.9.0 (has pipeline bindings) - native client: add _pulsePipelinedQueryQueue that batches all queued queries through pg-native pipeline(), delivering results per-query - native client: suppress queue length deprecation when pipelining
This comment was marked as resolved.
This comment was marked as resolved.
… benchmarks Pipeline mode requires sendQueryParams (extended query protocol), not sendQuery (simple query protocol). PostgreSQL rejects PQsendQuery in pipeline mode. Benchmark script now tests all four combinations: JS serial, JS pipelined, native serial, and native pipelined.
The native client end() was immediately terminating the connection, causing "Connection terminated" errors for queries still in the pipeline. Now waits for the drain event before closing when pipelining is active. Also fix pipeline mode to use sendQueryParams instead of sendQuery, since PostgreSQL rejects simple query protocol in pipeline mode.
…nd skip JS-only tests handleError in native/query.js crashes when this.native is undefined (e.g. query_timeout fires before pipeline callback sets it). Skip named statement cleanup and query_timeout tests for native client since those features rely on JS-specific internals.
|
@brianc what would be the next steps here? |
|
Just gotta review it & get it merged in, friend! Maybe tomorrow, definitely by Wendesday! |
|
Is the plan not still to replace the query queue with pipelining in pg 9? |
|
yo sorry for the massive delay on my side here, i've been on sabbatical and really not much computer interaction for the past 3-4 months. It's been pretty nice, but things have naturally backed up for me w/ pg. I'm recitifying that now. I'll pull down this PR & play with it today, but from initial inspection it looks shippable & I'm looking forward to getting this out ASAP (hopefully right after I review it provided there aren't problems). |
|
Okay so the only thing I'm seeing here is the api of const client = new Client({ host: 'foobar.com', pipelining: true }) |
Adds opt-in query pipelining via
client.pipelining = true(ornew Pool({ pipelining: true })).Pipelined queries are sent to the server without waiting for previous responses, using PostgreSQL's extended query protocol with per-query
Syncfor error isolation. ~2-3x throughput on batches of simple queries locally.pg— three-queue state machine (_queryQueue→_sentQueryQueue→_activeQuery), named-statement dedup across in-flight Parse, gracefulend(),query_timeoutunblocks the pipelinepg-pool—pipeliningoption sets the flag on every client the pool creates